一個服務發出訊息之後, 可以由多個服務分別註冊多個channel來監聽, 同一個TOPIC底下的每個channel都會拿到一樣的訊息。
當後端的溝通都是透過NSQ訊號的時候, 流程上每個環節的每個動作都可以拆分出來, 每個Consumer都可以接收到第一手的資料, 不需要再經過其他服務的傳遞。 這樣可以減少資料傳遞所消耗的時間、避免資料內容缺失、需要調整資料內容時也不用再大家都改, 只要源頭修改好了, 大家都會拿到一樣的資訊。
但Consumer收到訊息之後可能會做很多複雜的處理, 所以Consumer也需要graceful shutdown。
以下程式大概會是這樣:
建立Consumer
    // 建立空白設定檔。
    ConsumerConfig := nsq.NewConfig()
    // 設置重連時間
    ConsumerConfig.LookupdPollInterval = time.Second * 2
    consumer, _ := nsq.NewConsumer("COCONUT_UPDATE_POINT", "coconut", ConsumerConfig)
    consumer.AddConcurrentHandlers(TestNSQConsumer(), config.NsqConsumerWorkers)
    err = consumer.ConnectToNSQLookupd(config.NsqLookupdAddr)
    if err != nil {
        return
    }
AddConcurrentHandlers 是設定要開幾個worker處理, 如果會同時收大批訊息就要測試worker設定的合理值才能在時間內收下所有資料。
訊息進來後由TestNSQConsumer()負責收下來處理, message.DisableAutoResponse() & defer message.Finish()是不要再讓訊息Requeue回去, 這樣可能會發生處理到一半的狀態, 這邊要看使用的情境, 如果是需要訊息重發的可以使用Requeue, 可以參考官方的test case。
func TestNSQConsumer() nsq.Handler {
   return nsq.HandlerFunc(func(message *nsq.Message) (err error) {
       message.DisableAutoResponse()
       defer message.Finish()
       log.Printf("========================== 收到的訊息是:%v", string(message.Body))
       time.Sleep(time.Duration(20) * time.Second)
       log.Printf("========= 故意睡 20s 測試是否有等consumer處理完才結束")
       return nil
   })
}
NewConsumer完成之後, 宣告stopOnSignalExitNCL 把Consumer收集起來, 透過SnowFlake演算法給他一個不重複的ID
    var (
        wg                  sync.WaitGroup
        stopOnSignalExitNCL = NsqConsumerList{} // 收集停止訊號時停止nsq consumer 的列表
    )
    stopOnSignalExitNCL.Set(snowflakeNode.Generate().Int64(), consumer)
    // 停止部分Nsq Consumer避免有訊息進來
    Logger.Debugf("停止部分Nsq Consumer避免有訊息進來...")
    err = stopOnSignalExitNCL.Each(func(c *nsq.Consumer) error {
        wg.Add(1)
        go func(c *nsq.Consumer, wg *sync.WaitGroup) {
            // 停止訊號會等待正在處理的訊息做完才結束
            c.Stop()
            <-c.StopChan
            wg.Done()
        }(c, &wg)
        return nil
    })
    if err != nil {
        Logger.Errorf("stopOnSignalExitNCL.Each err: %s", err.Error())
    }
    wg.Wait()
    Logger.Debugf("停止 NSQ Consumer完成...")
NsqConsumerList 結構, 丟一個callback到consumer, 等到所有的worker都wg.Done()才能結束
    type NsqConsumerList struct {
        sync.Map
    }
    // Set 存Consumer
    func (ncl *NsqConsumerList) Set(key int64, c *nsq.Consumer) *NsqConsumerList {
        ncl.Store(key, c)
        return ncl
    }
    // Each callback每個Consumer
    func (ncl *NsqConsumerList) Each(callback func(c *nsq.Consumer) error) error {
        ncl.Range(func(k, v interface{}) bool {
            c, ok := v.(*nsq.Consumer)
            if !ok {
                return false
            }
            err := callback(c)
            if err != nil {
                return false
            }
            return true
        })
        return nil
    }
NSQ本身對於收到的訊息並沒有備份的機制, 如果需要額外紀錄下NSQ訊號的歷程資訊, 可以開一個channel在TOPIC底下監聽, 再看要儲存在哪個資料庫。 資料收集之後,假設有一天發生不可預期的嚴重資料遺失問題,至少還有備份可以回放時間歷程。
備份很重要,重要的資料一定要備份。